Crate shutdown_handler

source ·
Expand description

A graceful shutdown handler that allows all parts of an application to trigger a shutdown.

Why?

An application I was maintaining was in charge of 3 different services.

  • A RabbitMQ processing service
  • A gRPC Server
  • An HTTP metrics server.

Our RabbitMQ node was restarted, so our connections dropped and our service went into shutdown mode. However, due to a bug in our application layer, we didn’t acknowledge the failure immediately and continued handling the gRPC and HTTP traffic. Thankfully our alerts triggered that the queue was backing up and we manually restarted the application without any real impact.

Understandably, I wanted a way to not have this happen ever again. We fixed the bug in the application, and then tackled the root cause: Other services were oblivious that a shutdown happened.

Using this library, we’ve enforced that all service libraries take in a ShutdownHandler instance and use it to gracefully shutdown. If any of them are about to crash, they will immediately raise a shutdown signal. The other services will then see that signal, finish whatever work they had started, then shutdown.

Example

use std::pin::pin;
use std::sync::Arc;
use shutdown_handler::{ShutdownHandler, SignalOrComplete};

// Create the shutdown handler
let shutdown = Arc::new(ShutdownHandler::new());

// Shutdown on SIGTERM
shutdown.spawn_sigterm_handler().unwrap();

// Spawn a few service workers
let mut workers = tokio::task::JoinSet::new();
for port in 0..4 {
    workers.spawn(service(Arc::clone(&shutdown), port));
}

// await all workers and collect the errors
let mut errors = vec![];
while let Some(result) = workers.join_next().await {
    // unwrap any JoinErrors that happen if the tokio task panicked
    let result = result.unwrap();

    // did our service error?
    if let Err(e) = result {
        errors.push(e);
    }
}

assert_eq!(errors, ["port closed"]);

// Define our services to loop on work and shutdown gracefully

async fn service(shutdown: Arc<ShutdownHandler>, port: u16) -> Result<(), &'static str> {
    // a work loop that handles events
    for request in 0.. {
        let handle = pin!(handle_request(port, request));

        match shutdown.wait_for_signal_or_future(handle).await {
            // We finished handling the request without any interuptions. Continue
            SignalOrComplete::Completed(Ok(_)) => {}

            // There was an error handling the request, let's shutdown
            SignalOrComplete::Completed(Err(e)) => {
                shutdown.shutdown();
                return Err(e);
            }

            // There was a shutdown signal raised while handling this request
            SignalOrComplete::ShutdownSignal(handle) => {
                // We will finish handling the request but then exit
                return handle.await;
            }
        }
    }
    Ok(())
}

async fn handle_request(port: u16, request: usize) -> Result<(), &'static str> {
    // simulate some work being done
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
     
    // simulate an error
    if port == 3 && request > 12 {
        Err("port closed")
    } else {
        Ok(())
    }
}

Structs

Enums

  • Reports whether a future managed to complete without interuption, or if there was a shutdown signal